package com.ysx.search.service.impl;

import com.alibaba.fastjson.JSON;
import com.ysx.common.constants.BusinessConstants;
import com.ysx.common.pojo.PageInfo;
import com.ysx.common.utils.RequestHeaderInfoUtils;
import com.ysx.search.document.ArticleInfoDocument;
import com.ysx.search.dto.SearchDto;
import com.ysx.search.repository.ArticleInfoDocumentRepository;
import com.ysx.search.service.ArticleInfoDocumentSearchService;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Service
public class ArticleInfoDocumentSearchServiceImpl implements ArticleInfoDocumentSearchService {

    @Autowired
    private ElasticsearchRestTemplate elasticsearchRestTemplate;

    @Autowired
    private ArticleInfoDocumentRepository articleInfoDocumentRepository;

    @Autowired
    private KafkaTemplate kafkaTemplate;


    //kafka 发送消息 进行异步操作 额外定义一个单独的微服务进行search相关数据库的更新。
    //目前暂时放到一起 即：自己是生产者也是消费者。

    @Override
    public PageInfo<ArticleInfoDocument> search(SearchDto searchDto) {

        //1.获取搜索关键字 并设置默认值
        if (StringUtils.isEmpty(searchDto.getKeywords())) {
            //设置默认值
            searchDto.setKeywords("华为");
        }
        //2.获取当前页码 和 分页的条数 如果为null 设置默认值
        if (searchDto.getPage() == null || searchDto.getPage() <= 0) {
            searchDto.setPage(1);
        }
        if (searchDto.getSize() == null || searchDto.getSize() > 10) {
            searchDto.setSize(10);
        }


        //3.创建查询对象
        NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();

        //4.设置查询条件
        nativeSearchQueryBuilder.withQuery(QueryBuilders.matchQuery("title", searchDto.getKeywords()));

        //4.1设置高亮条件
        nativeSearchQueryBuilder.withHighlightFields(new HighlightBuilder.Field("title"));
        nativeSearchQueryBuilder.withHighlightBuilder(new HighlightBuilder()
                .preTags("<span style=\"color:red\">").postTags("</span>"));

        //4.2设置排序和分页
        nativeSearchQueryBuilder.withPageable(PageRequest.of((searchDto.getPage() - 1), searchDto.getSize()));
        nativeSearchQueryBuilder.withSort(SortBuilders.fieldSort("publishTime").order(SortOrder.DESC));
        //5.构建查询对象
        NativeSearchQuery query = nativeSearchQueryBuilder.build();
        //6.执行查询
        SearchHits<ArticleInfoDocument> searchHits = elasticsearchRestTemplate.search(query, ArticleInfoDocument.class, IndexCoordinates.of("article"));
        //获取总记录数
        long totalHits = searchHits.getTotalHits();
        List<ArticleInfoDocument> list = new ArrayList<ArticleInfoDocument>();
        for (SearchHit<ArticleInfoDocument> searchHit : searchHits) {

            ArticleInfoDocument content = searchHit.getContent();
            //获取高亮数据
            Map<String, List<String>> highlightFields = searchHit.getHighlightFields();
            //获取title的高亮
            List<String> list1 = highlightFields.get("title");
            StringBuffer sb = new StringBuffer();
            for (String value : list1) {
                sb.append(value);
            }
            if (!StringUtils.isEmpty(sb)) {
                content.setTitle(sb.toString());
            }
            list.add(content);
        }
        //7.获取结果
        Long totalPages = totalHits / searchDto.getSize();
        if (totalHits % searchDto.getSize() > 0) {
            totalPages++;
        }
        //8.封装返回
        PageInfo<ArticleInfoDocument> objectPageInfo = new PageInfo<ArticleInfoDocument>(Long.valueOf(searchDto.getPage()), Long.valueOf(searchDto.getSize()), totalHits, totalPages, list);

        return objectPageInfo;
    }


    private void sendMessage(SearchDto searchDto) {
        //发送异步消息  {type:0,keywords:"关键字",userId:"用户的ID或者是设备的ID"}
        Map<String,String> messageInfo = new HashMap<>(16);

        String userInfo = RequestHeaderInfoUtils.getHeaderUserId();
        if(userInfo.equals("0")){
            messageInfo.put("type","0");
            messageInfo.put("userId",searchDto.getEquipmentId().toString());

        }else {
            messageInfo.put("type","1");
            messageInfo.put("userId",userInfo);
        }
        messageInfo.put("keywords",searchDto.getKeywords());

        try {
            kafkaTemplate.send(BusinessConstants.MqConstants.SEARCH_BEHAVIOR_TOPIC, JSON.toJSONString(messageInfo));

        } catch (Exception e) {
            e.printStackTrace();
        }
    }




    /**
     * 添加信息
     *
     * @param articleInfoDocument
     * @return
     */
    @Override
    public void save(ArticleInfoDocument articleInfoDocument) {
        articleInfoDocumentRepository.save(articleInfoDocument);
    }

}
